package com.bw.gmall.realtime.common.funciton;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.bw.gmall.realtime.common.bean.TableProcessDim;
import com.bw.gmall.realtime.common.constant.Constant;
import com.bw.gmall.realtime.common.util.HbaseUtil;
import com.bw.gmall.realtime.common.util.RedisUtil;
import io.lettuce.core.RedisFuture;
import io.lettuce.core.api.StatefulRedisConnection;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.hadoop.hbase.client.AsyncConnection;

import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;

public abstract class  DimAsyncFunction <T>extends RichAsyncFunction<T, T> implements IDimAsync<T> {
    private StatefulRedisConnection<String, String> redisAsyncConnection;
    private AsyncConnection hBaseAsyncConnection;
    String table;
    public DimAsyncFunction(String table) {
        this.table = table;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        hBaseAsyncConnection = HbaseUtil.getHBaseAsyncConnection();
        redisAsyncConnection = RedisUtil.getRedisAsyncConnection();
    }

    @Override
    public void close() throws Exception {
        HbaseUtil.closeAsyncConnection(hBaseAsyncConnection);
        RedisUtil.closeRedisAsyncConnection(redisAsyncConnection);
    }
    @Override
    public void asyncInvoke(T t, ResultFuture<T> resultFuture) throws Exception {
        String dimId = getDimId(t);
        String redisKey = RedisUtil.getRedisKey(table, dimId);
        //先查redis
        CompletableFuture.supplyAsync(new Supplier<String>() {
            @Override
            public String get() {
                String dimInfo = null;
                RedisFuture<String> stringRedisFuture = redisAsyncConnection.async().get(redisKey);
                try {
                    dimInfo = stringRedisFuture.get();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
                return dimInfo;
            }
        }).thenApplyAsync(new Function<String, JSONObject>() {
            @Override
            public JSONObject apply(String dimInfo) {
                JSONObject jsonObject = null;
                // redis没有该值
                if (dimInfo == null || dimInfo.length() == 0) {
                    try {
                        jsonObject = HbaseUtil.getAsyncCells(hBaseAsyncConnection, Constant.HBASE_NAMESPACE, table, dimId);
                        // 放到redis,进行缓存
                        redisAsyncConnection.async().setex(redisKey, 24 * 3600, jsonObject.toJSONString());
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }else{
                    jsonObject = JSON.parseObject(dimInfo);
                }
                return jsonObject;
            }
        }).thenAccept(new Consumer<JSONObject>() {
            @Override
            public void accept(JSONObject dimInfo) {
                if (dimInfo != null) {
                    setDim(t,dimInfo);
                } else {
                    System.out.println("没有查到维度数据:" + table + ":" + dimId);
                }
                // 返回结果
                resultFuture.complete(Collections.singletonList(t));
            }
        });
    }


}
